#include "config.h"

#include <stdbool.h>
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>
#include <list.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"

#include "list.h"
#include "schedule.h"
#include "core.h"
#include "coroutine.h"
#include "volume_proto.h"

#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"

extern uint32_t lsv_feature;

/** @file API
 *
 * - lock here, internal is lock-free
 */

/**
 * 1 mem alloc
 * 2 disk space alloc
 * 3 mem map with disk space
 */
int lsv_wbuffer_init(lsv_volume_proto_t *lsv_info, lsv_u32_t flag) {
        int ret = 0;

        assert((void *)&(((lsv_chunk_buf_t*)0)->page_idx) - NULL == LSV_CHUNK_SIZE);

        lsv_wbuffer_mem_init(lsv_info);

        lsv_wbuffer_chunk_init(lsv_info);
        ret = lsv_wb_hash_index_init_v2(lsv_info);
        if (ret) {
                DERROR("lsv_wb_hash_index_init failed\n");
                YASSERT(0);
        }

        //if (lsv_feature & LSV_FEATURE_WAL2) {
                ret = lsv_wal2_segment_init(lsv_info);
                if (unlikely(ret))
                        YASSERT(0);

                if (flag == LSV_SYS_CREATE) {
                        // nothing to do
                } else if (flag == LSV_SYS_LOAD || flag == LSV_SYS_RECOVERY) {
                        lsv_wbuffer_load(lsv_info);
                } else {
                        YASSERT(0);
                }
        //}

        return 0;
}

int lsv_wbuffer_destroy(lsv_volume_proto_t *lsv_info) {
        int ret = 0;
        lsv_wbuf_t *wbuf = NULL;
        index_block_head * wb_hash_index = NULL;

        wbuf = lsv_info->wbuf;

        /*step 1 free memory*/
        lsv_wbuffer_chunk_destroy(lsv_info);

        /*step 3 free hash index*/
        wb_hash_index = wbuf->wb_hash_index;
        if (wb_hash_index != NULL) {
                lsv_wb_hash_index_destroy_v2(lsv_info);

                // free(wb_hash_index);
                // wbuf->wb_hash_index = NULL;
        }

        lsv_rwlock_destroy(&wbuf->wb_hash_rwlock);

        yfree((void **)&lsv_info->wbuf);

        return ret;
}

/**
 * 区分check和malloc， malloc需要再检查(double check)，因为状态可能发生变化
 * check both, 不能中断
 *
 * @param lsv_info
 * @param io
 * @return
 */
inline int lsv_wbuffer_check_io_write_pre(lsv_volume_proto_t *lsv_info, const io_t *io, check_io_write_pre_t *pre) {
        int left = wal_io_head_len() + io->size;

        pre->wbuf_is_ok = lsv_wbuffer_chunk_check(lsv_info, io->offset, io->size);

        if (lsv_feature & LSV_FEATURE_WAL2) {
                pre->wal_is_ok = lsv_wal2_segment_check(lsv_info, left, NULL);
        } else {
                pre->wal_is_ok = 1;
        }
        if (pre->wbuf_is_ok && pre->wal_is_ok) {
                return 1;
        }
        return 0;
}

/**
 * 小IO具有更高的优先级？
 *
 * @param lsv_info
 * @param io
 * @return
 */
int lsv_wbuffer_wait_io_write_pre(lsv_volume_proto_t *lsv_info, const io_t *io) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        check_io_write_pre_t pre;

        while(1) {
                // check stage
                if (lsv_wbuffer_check_io_write_pre(lsv_info, io, &pre)) {
                        break;
                }

                // expand stage
                if (!pre.wbuf_is_ok) {
                        DINFO_NOP("wait\n");
                        // 固定分配
                        co_cond_wait2(&wbuf->full_cond, "wait_wbuf");
                }

                if (!pre.wal_is_ok) {
                        // 动态分配
                        lsv_wal2_segment_ensure_free_list(lsv_info, 1);
                }
        }

        return 0;
}

int lsv_wbuffer_append_page(lsv_volume_proto_t *lsv_info, const lsv_io_opt_t *io_opt, buffer_t *page_buf) {
        int ret = 0;

        if (io_opt->lba % LSV_WBUF_PAGE_SIZE || (io_opt->lba + io_opt->size) % LSV_WBUF_PAGE_SIZE) {
                DERROR("offset or size not part of page: %llu %d\n", (LLU)io_opt->lba, io_opt->size);
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        lsv_io_opt_t opt;
        lsv_io_opt_init(&opt, io_opt->lsn, io_opt->lba, LSV_PAGE_SIZE);
        ret = lsv_wbuffer_chunk_append_page(lsv_info, &opt, page_buf);
        if (ret) {
                DERROR("lsv_wbuffer_append_page failed, ret: %d\n", ret);
                GOTO(err_ret, ret);
        }

        return io_opt->size;
err_ret:
        if (ret > 0) {
                ret = -ret;
        }
        return ret;
}

int lsv_wbuffer_page_read(lsv_volume_proto_t *lsv_info, lsv_u64_t offset, lsv_u32_t size, buffer_t *append_buf) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        index_block_head * wb_hash_index = wbuf->wb_hash_index;

        return lsv_wb_hash_index_lookup_v2(wb_hash_index, offset, size, append_buf);
}

int lsv_wbuffer_flush_unlock(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        // 1. wait io completion
        DINFO("wait io, %u\n", lsv_info->io_stream.count);
        while (!list_empty_careful(&lsv_info->io_stream.list)) {
                co_cond_wait(&wbuf->io_cond);
        }

        // 2. flush wbuf
        DINFO("wait wbuf\n");
        lsv_wbuffer_do_flush(lsv_info);

        // 3. wait pipeline completion
        DINFO("wait pipeline, log %u bitmap %u\n",
              wbuf->pipeline.log_queue.count,
              wbuf->pipeline.bitmap_queue.count);
        while (!list_empty_careful(&wbuf->pipeline.log_queue.list) ||
               !(list_empty_careful(&wbuf->pipeline.bitmap_queue.list))) {
                co_cond_wait(&wbuf->flush_cond);
        }

        DINFO("flush finished\n");

        // TODO 不能更新索引, 更新索引会改变page_idx等
        // lsv_wb_hash_index_regc_all(lsv_info);

        return 0;
}

int lsv_wbuffer_flush(lsv_volume_proto_t *lsv_info) {
        int ret = 0;

        // 如有IO进来，lock-free会有问题，且破坏了页的分配规则
        // 写操作之间，非冲突；
        // 写操作和flush之间，冲突
        //
        // 或者，采用多缓冲区方案，新进来的IO切换到另一缓冲区，
        // 从而实现lock free, wait free
        lsv_wrlock(&lsv_info->io_lock);

        lsv_wbuffer_flush_unlock(lsv_info);

        lsv_unlock(&lsv_info->io_lock);

        return ret;
}

int lsv_wbuffer_clean_unlock(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        lsv_wb_hash_index_clean_all(lsv_info);

        wbuf->last_chunk = NULL;
        wbuf->last_page_idx = -1;

        return 0;
}

int lsv_wbuffer_load(lsv_volume_proto_t *lsv_info) {
        int ret;

        ret = lsv_wal2_segment_load(lsv_info);
        if (unlikely(ret)) {
                YASSERT(0);
        }

        return 0;
}
